Skip to content

Conversation

@brfrn169
Copy link
Collaborator

@brfrn169 brfrn169 commented Jun 4, 2025

Description

This PR adds support for scan fetch size in the storage adapters. It introduces a property, scalar.db.scan_fetch_size, to control the number of records fetched by the scan API in the storage layer. It includes support for all storage adapters.

Related issues and/or PRs

N/A

Changes made

  • Added a new property scalar.db.scan_fetch_size.
  • Add support for the scan fetch size in all the storage adapters.

Checklist

The following is a best-effort checklist. If any items in this checklist are not applicable to this PR or are dependent on other, unmerged PRs, please still mark the checkboxes after you have read and understood each item.

  • I have commented my code, particularly in hard-to-understand areas.
  • I have updated the documentation to reflect the changes.
  • I have considered whether similar issues could occur in other products, components, or modules if this PR is for bug fixes.
  • Any remaining open issues linked to this PR are documented and up-to-date (Jira, GitHub, etc.).
  • Tests (unit, integration, etc.) have been added for the changes.
  • My changes generate no new warnings.
  • Any dependent changes in other PRs have been merged and published.

Additional notes (optional)

N/A

Release notes

Added support for scan fetch size in the storage adapters. You can control the number of records fetched by the scan API in the storage layer by configuring the scalar.db.scan_fetch_size property.

@brfrn169 brfrn169 requested a review from Copilot June 4, 2025 00:09
@brfrn169 brfrn169 self-assigned this Jun 4, 2025
@brfrn169 brfrn169 added the enhancement New feature or request label Jun 4, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a configurable scanner fetch size across all storage adapters, allowing users to control how many records are fetched per scan/query operation.

  • DatabaseConfig now reads scalar.db.scanner_fetch_size (default 10).
  • All storage handlers (JDBC, Cassandra, DynamoDB, Cosmos) accept and propagate the fetch size into their underlying clients/requests.
  • QueryScanner and JDBC scanner implementations have been updated to use the fetch size and updated tests accordingly.

Reviewed Changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
core/src/main/java/com/scalar/db/config/DatabaseConfig.java Load and expose scannerFetchSize from properties
core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java Pass scannerFetchSize into PreparedStatement.setFetchSize in 3-param scanner, but missing in 2-param version
core/src/main/java/com/scalar/db/storage/jdbc/ScannerImpl.java Rename close flag, commit on close, updated exception details
core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java Apply fetchSize to paginated requests
core/src/main/java/com/scalar/db/storage/cosmos/SelectStatementHandler.java Propagate fetchSize into Cosmos query options
core/src/main/java/com/scalar/db/storage/cassandra/SelectStatementHandler.java Set driver fetch size on bound statements
core/src/test/java/com/scalar/db/config/DatabaseConfigTest.java Add tests for default/explicit scanner fetch size
core/src/test/java/com/scalar/db/storage/** Update tests in all adapters to assert correct use of fetch size
Comments suppressed due to low confidence (2)

core/src/main/java/com/scalar/db/storage/jdbc/JdbcService.java:100

  • The two-parameter getScanner method does not set the fetch size on the PreparedStatement, so the configured scannerFetchSize is ignored when calling getScanner(scan, connection). Consider adding preparedStatement.setFetchSize(scannerFetchSize) before executing the query.
PreparedStatement preparedStatement = connection.prepareStatement(selectQuery.sql());

core/src/test/java/com/scalar/db/storage/jdbc/SelectStatementHandlerTest.java:610

  • No test covers the two-parameter getScanner method in JdbcService to verify that the fetchSize is applied correctly. Consider adding a unit test to simulate getScanner(scan, connection) and assert that PreparedStatement.setFetchSize is called with the configured scannerFetchSize.
}


systemNamespaceName = getSystemNamespaceName(getProperties());

scannerFetchSize = getInt(getProperties(), SCANNER_FETCH_SIZE, DEFAULT_SCANNER_FETCH_SIZE);
Copy link

Copilot AI Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing validation for scannerFetchSize from configuration. Negative or zero values could lead to unexpected behavior. Consider enforcing scannerFetchSize > 0 and throwing an exception on invalid values.

Copilot uses AI. Check for mistakes.
}
}

@Override
Copy link

Copilot AI Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Add @OverRide annotation to the close() method to clarify that it implements the Closeable/AutoCloseable interface and to catch mismatches in the method signature.

Suggested change
@Override
@Override
@Override

Copilot uses AI. Check for mistakes.
public static final String CROSS_PARTITION_SCAN_FILTERING = SCAN_PREFIX + "filtering.enabled";
public static final String CROSS_PARTITION_SCAN_ORDERING = SCAN_PREFIX + "ordering.enabled";
public static final String SYSTEM_NAMESPACE_NAME = PREFIX + "system_namespace_name";
public static final String SCANNER_FETCH_SIZE = PREFIX + "scanner_fetch_size";
Copy link
Collaborator Author

@brfrn169 brfrn169 Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new property scalar.db.scan_fetch_size.

@Override
@Nonnull
protected ResultSet execute(BoundStatement bound, Operation operation) {
bound.setFetchSize(fetchSize);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Cassandra, call Statement.setFetchSize().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change for Dynamo.

Connection connection = null;
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For JDBC, call Connection.setAutoCommit(false).

SelectQuery selectQuery = buildSelectQuery(scan, tableMetadata);
PreparedStatement preparedStatement = connection.prepareStatement(selectQuery.sql());
selectQuery.bind(preparedStatement);
preparedStatement.setFetchSize(scannerFetchSize);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, call Statement.setFetchSize() as well for JDBC.


@Override
public Map<String, String> getConnectionProperties() {
return Collections.singletonMap("useCursorFetch", "true");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, for MySQL, we need to set useCursorFetch=true in the connection properties.

Comment on lines +337 to +338
.queryItems(query, queryOptions.setMaxBufferedItemCount(fetchSize), Record.class)
.iterableByPage(fetchSize)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change for Cosmos.

Copy link
Contributor

@feeblefakie feeblefakie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, looking good to me!
Left a minor naming suggestion. PTAL!

public static final String CROSS_PARTITION_SCAN_FILTERING = SCAN_PREFIX + "filtering.enabled";
public static final String CROSS_PARTITION_SCAN_ORDERING = SCAN_PREFIX + "ordering.enabled";
public static final String SYSTEM_NAMESPACE_NAME = PREFIX + "system_namespace_name";
public static final String SCANNER_FETCH_SIZE = PREFIX + "scanner_fetch_size";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, it should be scan_fetch_size since a scanner is more like an iterator for results that are scanned with the specified fetch size?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Fixed in 877ddbf.

@brfrn169 brfrn169 changed the title Add support for scanner fetch size in storage adapters Add support for scan fetch size in storage adapters Jun 4, 2025
@brfrn169 brfrn169 requested a review from feeblefakie June 4, 2025 08:37
Copy link
Contributor

@feeblefakie feeblefakie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you!

@komamitsu
Copy link
Contributor

@brfrn169 It seems no integration test case is added that scans records more than the fetch size. How about adding such a test case? Sorry in advance if existing test cases already cover that.

@brfrn169
Copy link
Collaborator Author

brfrn169 commented Jun 4, 2025

It seems no integration test case is added that scans records more than the fetch size. How about adding such a test case? Sorry in advance if existing test cases already cover that.

@komamitsu We have the following tests for large scans:

@Test
public void scan_ScanLargeData_ShouldRetrieveExpectedValues()
throws ExecutionException, IOException {
int recordCount = 345;
// Arrange
Key partitionKey = Key.ofInt(COL_NAME1, 1);
for (int i = 0; i < recordCount; i++) {
Key clusteringKey = Key.ofInt(COL_NAME4, i);
storage.put(
Put.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.blobValue(COL_NAME6, new byte[getLargeDataSizeInBytes()])
.build());
}
Scan scan =
Scan.newBuilder().namespace(namespace).table(TABLE).partitionKey(partitionKey).build();
// Act
List<Result> results = scanAll(scan);
// Assert
assertThat(results.size()).isEqualTo(recordCount);
for (int i = 0; i < recordCount; i++) {
assertThat(results.get(i).getInt(COL_NAME1)).isEqualTo(1);
assertThat(results.get(i).getInt(COL_NAME4)).isEqualTo(i);
}
}
@Test
public void scan_ScanLargeDataWithOrdering_ShouldRetrieveExpectedValues()
throws ExecutionException, IOException {
int recordCount = 345;
int fetchCount = 234;
// Arrange
Key partitionKey = Key.ofInt(COL_NAME1, 1);
for (int i = 0; i < recordCount; i++) {
Key clusteringKey = Key.ofInt(COL_NAME4, i);
storage.put(
Put.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.blobValue(COL_NAME6, new byte[getLargeDataSizeInBytes()])
.build());
}
Scan scanAsc =
Scan.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(partitionKey)
.ordering(Scan.Ordering.asc(COL_NAME4))
.build();
Scan scanDesc =
Scan.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(partitionKey)
.ordering(Scan.Ordering.desc(COL_NAME4))
.build();
// Act
List<Result> resultsAsc = new ArrayList<>();
try (Scanner scanner = storage.scan(scanAsc)) {
Iterator<Result> iterator = scanner.iterator();
for (int i = 0; i < fetchCount; i++) {
resultsAsc.add(iterator.next());
}
}
List<Result> resultsDesc = new ArrayList<>();
try (Scanner scanner = storage.scan(scanDesc)) {
Iterator<Result> iterator = scanner.iterator();
for (int i = 0; i < fetchCount; i++) {
resultsDesc.add(iterator.next());
}
}
// Assert
assertThat(resultsAsc.size()).isEqualTo(fetchCount);
for (int i = 0; i < fetchCount; i++) {
assertThat(resultsAsc.get(i).getInt(COL_NAME1)).isEqualTo(1);
assertThat(resultsAsc.get(i).getInt(COL_NAME4)).isEqualTo(i);
}
assertThat(resultsDesc.size()).isEqualTo(fetchCount);
for (int i = 0; i < fetchCount; i++) {
assertThat(resultsDesc.get(i).getInt(COL_NAME1)).isEqualTo(1);
assertThat(resultsDesc.get(i).getInt(COL_NAME4)).isEqualTo(recordCount - i - 1);
}
}
@Test
public void scan_ScanLargeDataWithLimit_ShouldRetrieveExpectedValues()
throws ExecutionException, IOException {
// Arrange
int recordCount = 345;
int limit = 234;
Key partitionKey = Key.ofInt(COL_NAME1, 1);
for (int i = 0; i < recordCount; i++) {
Key clusteringKey = Key.ofInt(COL_NAME4, i);
storage.put(
Put.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.blobValue(COL_NAME6, new byte[getLargeDataSizeInBytes()])
.build());
}
Scan scan =
Scan.newBuilder()
.namespace(namespace)
.table(TABLE)
.partitionKey(partitionKey)
.limit(limit)
.build();
// Act
List<Result> results = scanAll(scan);
// Assert
assertThat(results.size()).isEqualTo(limit);
for (int i = 0; i < limit; i++) {
assertThat(results.get(i).getInt(COL_NAME1)).isEqualTo(1);
assertThat(results.get(i).getInt(COL_NAME4)).isEqualTo(i);
}
}

Copy link
Contributor

@komamitsu komamitsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 👍

Copy link
Contributor

@Torch3333 Torch3333 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

Copy link
Contributor

@thongdk8 thongdk8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you for the PR. I have also checked and confirmed that the fix works well with the default configuration for all the supported databases.

@brfrn169
Copy link
Collaborator Author

brfrn169 commented Jun 9, 2025

I have also checked and confirmed that the fix works well with the default configuration for all the supported databases.

@Torch3333 Thank you for your confirmation!

@brfrn169 brfrn169 merged commit 4c5686c into master Jun 9, 2025
55 checks passed
@brfrn169 brfrn169 deleted the add-support-for-scanner-fetch-size-in-storage-adapters branch June 9, 2025 07:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants